Skip to content

Conversation

@wesm
Copy link
Member

@wesm wesm commented Nov 20, 2017

While this will cause some minor API breakage in parquet-cpp and some other downstream users, this is reasonably long overdue. It will permit implementations of the RecordBatch or Table interface that do lazy IO / data loading or lazy materialization of columns.

I will write a patch to fix up parquet-cpp, and will look to see if glib is easy to fix. There's no good way to go about merging this patch since a green build is not possible, so once we're happy with the patch, I can merge this patch and then work on getting a green build in parquet-cpp so we don't have a broken build there for too long

wesm added 8 commits November 20, 2017 10:28
Change-Id: I0a252a1c7606b0d98827765029aaa4dce3e445bb
…efactoring

Change-Id: I10e2b777beae1adc7dbbe2120d9331c9b37eb4ff
…o set build type in Travis CI

Change-Id: I8fe9a8e155c6c37a7a4a5f921eb411d45bd8c5d8
Change-Id: I180f4c0d2af218b9fca4ff115be19c9dc6c7d9f7
Change-Id: Ia41fd3d634bb7d0a9090f7758e230b1665df21a3
…at define

Change-Id: I81e9f9a11cd0c43e0d629e404643f95b16a1f75a
Change-Id: I996c45e66a4b835b2c46003a6bed84da7d446eee
Change-Id: I87885c88e09617dceee6be3b801bdb5442081955
@wesm
Copy link
Member Author

wesm commented Nov 20, 2017

@kou I fixed the glib compilation, but I added DCHECKs to the record batch constructor to assert that the schema is the same size as the columns, but this isn't being checked it seems in the Glib bindings:

TestFileWriter:
  test_write_record_batch:
/home/wesm/code/arrow/cpp/src/arrow/record_batch.cc:39 Check failed: (static_cast<int>(columns.size())) == (schema->num_fields()) 

Change-Id: If3a168a02443007946dd80f47a4aaa4f9d60b132
const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0;

/// \brief Name in i-th column
const std::string& column_name(int i) const;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be std::string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is coming from the schema, so a copy not strictly necessary

Change-Id: I36b935cc45ec2280ab221be14dccfda44f9ccab6
@wesm
Copy link
Member Author

wesm commented Nov 20, 2017

@kou I removed the dchecks from the ctor that I mentioned in favor of validating in RecordBatch::Validate. The tests are segfaulting, though, I stepped into gdb to look at the failing test, it looks like a record batch in the test suite might be malformed (the schema is larger than the actual number of columns)

@kou
Copy link
Member

kou commented Nov 21, 2017

@wesm I confirmed. The test creates 0 rows record batch with empty columns. It causes the segmentation fault. The following patch fixes this:

diff --git a/c_glib/test/test-file-writer.rb b/c_glib/test/test-file-writer.rb
index 3de8e5cf..67aed85f 100644
--- a/c_glib/test/test-file-writer.rb
+++ b/c_glib/test/test-file-writer.rb
@@ -19,14 +19,18 @@ class TestFileWriter < Test::Unit::TestCase
   include Helper::Buildable
 
   def test_write_record_batch
+    data = [true]
+    field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+    schema = Arrow::Schema.new([field])
+
     tempfile = Tempfile.open("arrow-ipc-file-writer")
     output = Arrow::FileOutputStream.new(tempfile.path, false)
     begin
-      field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
-      schema = Arrow::Schema.new([field])
       file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
       begin
-        record_batch = Arrow::RecordBatch.new(schema, 0, [])
+        record_batch = Arrow::RecordBatch.new(schema,
+                                              data.size,
+                                              [build_boolean_array(data)])
         file_writer.write_record_batch(record_batch)
       ensure
         file_writer.close
@@ -38,8 +42,12 @@ class TestFileWriter < Test::Unit::TestCase
     input = Arrow::MemoryMappedInputStream.new(tempfile.path)
     begin
       file_reader = Arrow::RecordBatchFileReader.new(input)
-      assert_equal(["enabled"],
+      assert_equal([field.name],
                    file_reader.schema.fields.collect(&:name))
+      assert_equal(Arrow::RecordBatch.new(schema,
+                                          data.size,
+                                          [build_boolean_array(data)]),
+                   file_reader.read_record_batch(0))
     ensure
       input.close
     end
diff --git a/c_glib/test/test-gio-input-stream.rb b/c_glib/test/test-gio-input-stream.rb
index a71a3704..2adf25b3 100644
--- a/c_glib/test/test-gio-input-stream.rb
+++ b/c_glib/test/test-gio-input-stream.rb
@@ -16,15 +16,21 @@
 # under the License.
 
 class TestGIOInputStream < Test::Unit::TestCase
+  include Helper::Buildable
+
   def test_reader_backend
+    data = [true]
+    field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+    schema = Arrow::Schema.new([field])
+
     tempfile = Tempfile.open("arrow-gio-input-stream")
     output = Arrow::FileOutputStream.new(tempfile.path, false)
     begin
-      field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
-      schema = Arrow::Schema.new([field])
       file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
       begin
-        record_batch = Arrow::RecordBatch.new(schema, 0, [])
+        record_batch = Arrow::RecordBatch.new(schema,
+                                              data.size,
+                                              [build_boolean_array(data)])
         file_writer.write_record_batch(record_batch)
       ensure
         file_writer.close
@@ -38,8 +44,12 @@ class TestGIOInputStream < Test::Unit::TestCase
     input = Arrow::GIOInputStream.new(input_stream)
     begin
       file_reader = Arrow::RecordBatchFileReader.new(input)
-      assert_equal(["enabled"],
+      assert_equal([field.name],
                    file_reader.schema.fields.collect(&:name))
+      assert_equal(Arrow::RecordBatch.new(schema,
+                                          data.size,
+                                          [build_boolean_array(data)]),
+                   file_reader.read_record_batch(0))
     ensure
       input.close
     end
diff --git a/c_glib/test/test-gio-output-stream.rb b/c_glib/test/test-gio-output-stream.rb
index adaa8c1b..c77598ed 100644
--- a/c_glib/test/test-gio-output-stream.rb
+++ b/c_glib/test/test-gio-output-stream.rb
@@ -16,17 +16,23 @@
 # under the License.
 
 class TestGIOOutputStream < Test::Unit::TestCase
+  include Helper::Buildable
+
   def test_writer_backend
+    data = [true]
+    field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+    schema = Arrow::Schema.new([field])
+
     tempfile = Tempfile.open("arrow-gio-output-stream")
     file = Gio::File.new_for_path(tempfile.path)
     output_stream = file.append_to(:none)
     output = Arrow::GIOOutputStream.new(output_stream)
     begin
-      field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
-      schema = Arrow::Schema.new([field])
       file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
       begin
-        record_batch = Arrow::RecordBatch.new(schema, 0, [])
+        record_batch = Arrow::RecordBatch.new(schema,
+                                              data.size,
+                                              [build_boolean_array(data)])
         file_writer.write_record_batch(record_batch)
       ensure
         file_writer.close
@@ -38,8 +44,12 @@ class TestGIOOutputStream < Test::Unit::TestCase
     input = Arrow::MemoryMappedInputStream.new(tempfile.path)
     begin
       file_reader = Arrow::RecordBatchFileReader.new(input)
-      assert_equal(["enabled"],
+      assert_equal([field.name],
                    file_reader.schema.fields.collect(&:name))
+      assert_equal(Arrow::RecordBatch.new(schema,
+                                          data.size,
+                                          [build_boolean_array(data)]),
+                   file_reader.read_record_batch(0))
     ensure
       input.close
     end
diff --git a/c_glib/test/test-stream-writer.rb b/c_glib/test/test-stream-writer.rb
index c3d0e149..32754e20 100644
--- a/c_glib/test/test-stream-writer.rb
+++ b/c_glib/test/test-stream-writer.rb
@@ -19,17 +19,19 @@ class TestStreamWriter < Test::Unit::TestCase
   include Helper::Buildable
 
   def test_write_record_batch
+    data = [true]
+    field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+    schema = Arrow::Schema.new([field])
+
     tempfile = Tempfile.open("arrow-ipc-stream-writer")
     output = Arrow::FileOutputStream.new(tempfile.path, false)
     begin
-      field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
-      schema = Arrow::Schema.new([field])
       stream_writer = Arrow::RecordBatchStreamWriter.new(output, schema)
       begin
         columns = [
-          build_boolean_array([true]),
+          build_boolean_array(data),
         ]
-        record_batch = Arrow::RecordBatch.new(schema, 1, columns)
+        record_batch = Arrow::RecordBatch.new(schema, data.size, columns)
         stream_writer.write_record_batch(record_batch)
       ensure
         stream_writer.close
@@ -41,10 +43,12 @@ class TestStreamWriter < Test::Unit::TestCase
     input = Arrow::MemoryMappedInputStream.new(tempfile.path)
     begin
       stream_reader = Arrow::RecordBatchStreamReader.new(input)
-      assert_equal(["enabled"],
+      assert_equal([field.name],
                    stream_reader.schema.fields.collect(&:name))
-      assert_equal(true,
-                   stream_reader.read_next.get_column(0).get_value(0))
+      assert_equal(Arrow::RecordBatch.new(schema,
+                                          data.size,
+                                          [build_boolean_array(data)]),
+                   stream_reader.read_next)
       assert_nil(stream_reader.read_next)
     ensure
       input.close

@kou
Copy link
Member

kou commented Nov 21, 2017

If we allow empty record batch (no columns record batch), the following check is needed:

diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 3c1db061..6af9bc4b 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -139,9 +139,11 @@ class RecordBatchSerializer : public ArrayVisitor {
       buffers_.clear();
     }
 
-    // Perform depth-first traversal of the row-batch
-    for (int i = 0; i < batch.num_columns(); ++i) {
-      RETURN_NOT_OK(VisitArray(*batch.column(i)));
+    if (batch.num_rows() > 0) {
+      // Perform depth-first traversal of the row-batch
+      for (int i = 0; i < batch.num_columns(); ++i) {
+        RETURN_NOT_OK(VisitArray(*batch.column(i)));
+      }
     }
 
     // The position for the start of a buffer relative to the passed frame of

I'm OK that we deny no columns record batch. It'll simplify our code.

@wesm
Copy link
Member Author

wesm commented Nov 21, 2017

@kou it seems like there are two different issues here.

Here, a schema with 1 field was passed along with a list of 0 columns:

-        record_batch = Arrow::RecordBatch.new(schema, 0, [])
+        record_batch = Arrow::RecordBatch.new(schema,
+                                              data.size,
+                                              [build_boolean_array(data)])

I believe this would result in segfaults even if the number of rows is non-zero. So having empty / length-0 record batches in the IPC writer code path is fine so long as the columns matches the schema.

The reason this bug was not caught before was that the RecordBatch::columns_ member was being used to determine RecordBatch::num_columns(), whereas now we are using the schema. It seems like respecting the schema is the right approach. I could add boundschecking to SimpleRecordBatch::column(i) and return null if the index is out of bounds, would that help at all?

Change-Id: I23c701d8246faf8669d70a1bf6ce5ce0bc170591
Copy link
Member

@xhochy xhochy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM

wesm added 3 commits November 21, 2017 16:09
Change-Id: Ie205d1fb8c3e05a2b01ecf4c317cfdc1d3acbd24
Change-Id: I345dfe5aaf7e5adb49ceccf5663d950c21c03558
Change-Id: I7511a0b63c3f540d4dc688eef5a86f80f09228d0
@wesm
Copy link
Member Author

wesm commented Nov 22, 2017

Merging, since now the Linux build has failed only when reaching parquet-cpp. I will update the parquet-cpp patch and then merge that once its build passes

@wesm wesm closed this in fc4e2c3 Nov 22, 2017
@wesm wesm deleted the ARROW-1808 branch November 22, 2017 00:05
@kou
Copy link
Member

kou commented Nov 22, 2017

It seems like respecting the schema is the right approach.

I agree with you.

I could add boundschecking to SimpleRecordBatch::column(i) and return null if the index is out of bounds, would that help at all?

I think that it's better that we do it in more higher layer such as GLib bindings layer. I think that we don't do needless checks in C++ layer for simplicity and performance.

I'll add boundschecking in GLib bindings later. For now, I think that it's better that we always validate a newly created record batch. If we always validate it, we can assume that all record batches always have valid data.

From d9260c09765b1cd337cda5a09497ee1b985ef623 Mon Sep 17 00:00:00 2001
From: Kouhei Sutou <[email protected]>
Date: Wed, 22 Nov 2017 09:09:30 +0900
Subject: [PATCH] [GLib] Always validate on creating new record batch

---
 c_glib/arrow-glib/record-batch.cpp | 13 ++++++++---
 c_glib/arrow-glib/record-batch.h   |  3 ++-
 c_glib/example/go/write-batch.go   | 10 +++++++--
 c_glib/example/go/write-stream.go  | 10 +++++++--
 c_glib/test/test-record-batch.rb   | 46 +++++++++++++++++++++++++-------------
 5 files changed, 58 insertions(+), 24 deletions(-)

diff --git a/c_glib/arrow-glib/record-batch.cpp b/c_glib/arrow-glib/record-batch.cpp
index f23a0cf7..73de6eeb 100644
--- a/c_glib/arrow-glib/record-batch.cpp
+++ b/c_glib/arrow-glib/record-batch.cpp
@@ -135,13 +135,15 @@ garrow_record_batch_class_init(GArrowRecordBatchClass *klass)
  * @schema: The schema of the record batch.
  * @n_rows: The number of the rows in the record batch.
  * @columns: (element-type GArrowArray): The columns in the record batch.
+ * @error: (nullable): Return location for a #GError or %NULL.
  *
- * Returns: A newly created #GArrowRecordBatch.
+ * Returns: (nullable): A newly created #GArrowRecordBatch or %NULL on error.
  */
 GArrowRecordBatch *
 garrow_record_batch_new(GArrowSchema *schema,
                         guint32 n_rows,
-                        GList *columns)
+                        GList *columns,
+                        GError **error)
 {
   std::vector<std::shared_ptr<arrow::Array>> arrow_columns;
   for (GList *node = columns; node; node = node->next) {
@@ -152,7 +154,12 @@ garrow_record_batch_new(GArrowSchema *schema,
   auto arrow_record_batch =
     arrow::RecordBatch::Make(garrow_schema_get_raw(schema),
                              n_rows, arrow_columns);
-  return garrow_record_batch_new_raw(&arrow_record_batch);
+  auto status = arrow_record_batch->Validate();
+  if (garrow_error_check(error, status, "[record-batch][new]")) {
+    return garrow_record_batch_new_raw(&arrow_record_batch);
+  } else {
+    return NULL;
+  }
 }
 
 /**
diff --git a/c_glib/arrow-glib/record-batch.h b/c_glib/arrow-glib/record-batch.h
index 021f894f..823a42bb 100644
--- a/c_glib/arrow-glib/record-batch.h
+++ b/c_glib/arrow-glib/record-batch.h
@@ -68,7 +68,8 @@ GType garrow_record_batch_get_type(void) G_GNUC_CONST;
 
 GArrowRecordBatch *garrow_record_batch_new(GArrowSchema *schema,
                                            guint32 n_rows,
-                                           GList *columns);
+                                           GList *columns,
+                                           GError **error);
 
 gboolean garrow_record_batch_equal(GArrowRecordBatch *record_batch,
                                    GArrowRecordBatch *other_record_batch);
diff --git a/c_glib/example/go/write-batch.go b/c_glib/example/go/write-batch.go
index 9dbc3c00..f4d03ed9 100644
--- a/c_glib/example/go/write-batch.go
+++ b/c_glib/example/go/write-batch.go
@@ -188,7 +188,10 @@ func main() {
 		BuildDoubleArray(),
 	}
 
-	recordBatch := arrow.NewRecordBatch(schema, 4, columns)
+	recordBatch, err := arrow.NewRecordBatch(schema, 4, columns)
+	if err != nil {
+		log.Fatalf("Failed to create record batch #1: %v", err)
+	}
 	_, err = writer.WriteRecordBatch(recordBatch)
 	if err != nil {
 		log.Fatalf("Failed to write record batch #1: %v", err)
@@ -198,7 +201,10 @@ func main() {
 	for i, column := range columns {
 		slicedColumns[i] = column.Slice(1, 3)
 	}
-	recordBatch = arrow.NewRecordBatch(schema, 3, slicedColumns)
+	recordBatch, err = arrow.NewRecordBatch(schema, 3, slicedColumns)
+	if err != nil {
+		log.Fatalf("Failed to create record batch #2: %v", err)
+	}
 	_, err = writer.WriteRecordBatch(recordBatch)
 	if err != nil {
 		log.Fatalf("Failed to write record batch #2: %v", err)
diff --git a/c_glib/example/go/write-stream.go b/c_glib/example/go/write-stream.go
index 244741e8..7225156a 100644
--- a/c_glib/example/go/write-stream.go
+++ b/c_glib/example/go/write-stream.go
@@ -188,7 +188,10 @@ func main() {
 		BuildDoubleArray(),
 	}
 
-	recordBatch := arrow.NewRecordBatch(schema, 4, columns)
+	recordBatch, err := arrow.NewRecordBatch(schema, 4, columns)
+	if err != nil {
+		log.Fatalf("Failed to create record batch #1: %v", err)
+	}
 	_, err = writer.WriteRecordBatch(recordBatch)
 	if err != nil {
 		log.Fatalf("Failed to write record batch #1: %v", err)
@@ -198,7 +201,10 @@ func main() {
 	for i, column := range columns {
 		slicedColumns[i] = column.Slice(1, 3)
 	}
-	recordBatch = arrow.NewRecordBatch(schema, 3, slicedColumns)
+	recordBatch, err = arrow.NewRecordBatch(schema, 3, slicedColumns)
+	if err != nil {
+		log.Fatalf("Failed to create record batch #2: %v", err)
+	}
 	writer.WriteRecordBatch(recordBatch)
 	_, err = writer.WriteRecordBatch(recordBatch)
 	if err != nil {
diff --git a/c_glib/test/test-record-batch.rb b/c_glib/test/test-record-batch.rb
index 9fd34b7d..325944b8 100644
--- a/c_glib/test/test-record-batch.rb
+++ b/c_glib/test/test-record-batch.rb
@@ -18,18 +18,32 @@
 class TestTable < Test::Unit::TestCase
   include Helper::Buildable
 
-  def test_new
-    fields = [
-      Arrow::Field.new("visible", Arrow::BooleanDataType.new),
-      Arrow::Field.new("valid", Arrow::BooleanDataType.new),
-    ]
-    schema = Arrow::Schema.new(fields)
-    columns = [
-      build_boolean_array([true]),
-      build_boolean_array([false]),
-    ]
-    record_batch = Arrow::RecordBatch.new(schema, 1, columns)
-    assert_equal(1, record_batch.n_rows)
+  sub_test_case(".new") do
+    def test_valid
+      fields = [
+        Arrow::Field.new("visible", Arrow::BooleanDataType.new),
+        Arrow::Field.new("valid", Arrow::BooleanDataType.new),
+      ]
+      schema = Arrow::Schema.new(fields)
+      columns = [
+        build_boolean_array([true]),
+        build_boolean_array([false]),
+      ]
+      record_batch = Arrow::RecordBatch.new(schema, 1, columns)
+      assert_equal(1, record_batch.n_rows)
+    end
+
+    def test_no_columns
+      fields = [
+        Arrow::Field.new("visible", Arrow::BooleanDataType.new),
+      ]
+      schema = Arrow::Schema.new(fields)
+      message = "[record-batch][new]: " +
+        "Invalid: Number of columns did not match schema"
+      assert_raise(Arrow::Error::Invalid.new(message)) do
+        Arrow::RecordBatch.new(schema, 0, [])
+      end
+    end
   end
 
   sub_test_case("instance methods") do
@@ -40,7 +54,7 @@ class TestTable < Test::Unit::TestCase
       ]
       schema = Arrow::Schema.new(fields)
       columns = [
-        build_boolean_array([true, false, true, false, true, false]),
+        build_boolean_array([true, false, true, false, true]),
         build_boolean_array([false, true, false, true, false]),
       ]
       @record_batch = Arrow::RecordBatch.new(schema, 5, columns)
@@ -53,7 +67,7 @@ class TestTable < Test::Unit::TestCase
       ]
       schema = Arrow::Schema.new(fields)
       columns = [
-        build_boolean_array([true, false, true, false, true, false]),
+        build_boolean_array([true, false, true, false, true]),
         build_boolean_array([false, true, false, true, false]),
       ]
       other_record_batch = Arrow::RecordBatch.new(schema, 5, columns)
@@ -71,7 +85,7 @@ class TestTable < Test::Unit::TestCase
     end
 
     def test_columns
-      assert_equal([6, 5],
+      assert_equal([5, 5],
                    @record_batch.columns.collect(&:length))
     end
 
@@ -94,7 +108,7 @@ class TestTable < Test::Unit::TestCase
 
     def test_to_s
       assert_equal(<<-PRETTY_PRINT, @record_batch.to_s)
-visible: [true, false, true, false, true, false]
+visible: [true, false, true, false, true]
 valid: [false, true, false, true, false]
       PRETTY_PRINT
     end
-- 
2.15.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants